package com.huan.hadoop.mr;

import com.google.common.base.Joiner;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

/**
 * mapper 操作，将订单中的 商品id 替换成商品名称
 *
 * @author huan.fu
 * @date 2023/7/16 - 15:06
 */
public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    private final Map<String, String> productTypeMap = new HashMap<>(16);
    private final Text outKey = new Text();

    /**
     * 任务开始前，将 商品数据 缓存到 productTypeMap 中
     */
    @Override
    protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
        // 获取 MapJoinDriver.java 中设置的缓存文件
        URI[] cacheFiles = context.getCacheFiles();
        // 获取到 product.txt 文件
        Path productPath = new Path(cacheFiles[0]);
        // 获取文件系统对象
        FileSystem fileSystem = FileSystem.get(context.getConfiguration());
        FSDataInputStream is = fileSystem.open(productPath);
        try (BufferedReader br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
            String line;
            while (null != (line = br.readLine())) {
                // [01,小米] ...
                String[] cells = line.split("\\s+");
                productTypeMap.put(cells[0], cells[1]);
            }
        }
    }

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
        // 获取一行订单对象 (1001	01	1)
        String line = value.toString();
        // 分割 [1001,01,1]
        String[] cells = line.split("\\s+");
        // 将第二个字段 替换成 商品名称
        cells[1] = productTypeMap.get(cells[1]);
        // 设置outKey的值
        outKey.set(Joiner.on("\t").join(cells));
        // 写出
        context.write(outKey, NullWritable.get());
    }
}
